-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18362][SQL] Use TextFileFormat in implementation of CSVFileFormat #15813
[SPARK-18362][SQL] Use TextFileFormat in implementation of CSVFileFormat #15813
Conversation
Test build #68359 has finished for PR 15813 at commit
|
paths = inputPaths, | ||
className = classOf[TextFileFormat].getName | ||
).resolveRelation(checkFilesExist = false)) | ||
.select("value").as[String](Encoders.STRING) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @JoshRosen, I just happened to look at this one and I am just curious. IIUC, the schema from the sparkSession.baseRelationToDataFrame
will always has only value
column not including partitioned columns (it is empty and also inputPaths
will be always leaf files).
So, my question is, is that .select("value")
used just to doubly make sure? Just curious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied this logic from the text
method in DataFrameReader
, so that's where the value
came from.
Jenkins, retest this please |
Test build #68432 has finished for PR 15813 at commit
|
Jenkins, retest this please |
Test build #68674 has finished for PR 15813 at commit
|
Test build #68800 has finished for PR 15813 at commit
|
if (options.isCommentSet) { | ||
val comment = options.comment.toString | ||
rdd.filter { line => | ||
lines.filter { line => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using untyped filter
can be more performant here since we don't need to pay for the extra de/serialization costs:
lines.filter(length(trim($"value")) > 0 && $"value".startsWith(comment))
line.trim.nonEmpty && !line.startsWith(comment) | ||
}.first() | ||
} else { | ||
rdd.filter { line => | ||
lines.filter { line => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
Any thoughts on modifying
Likely out of scope for this pull request, but if there is a push to migrate from |
Test build #69418 has finished for PR 15813 at commit
|
@NathanHowell, I've gone ahead and removed the JSON changes from this PR; now it only touches CSV and thus should not conflict with your work. @liancheng, want to give this a final review? I've addressed your earlier comments. |
Merging in master. |
} else { | ||
val charset = options.charset | ||
sparkSession.sparkContext | ||
.hadoopFile[LongWritable, Text, TextInputFormat](location) | ||
val rdd = sparkSession.sparkContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JoshRosen do you know why the special handling for non-utf8 encoding is needed? I would think TextFileFormat itself already supports that since it is reading it in from Hadoop Text.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure; I think this was a carryover from spark-csv
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @falaki
Can you chime in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
## What changes were proposed in this pull request? This patch significantly improves the IO / file listing performance of schema inference in Spark's built-in CSV data source. Previously, this data source used the legacy `SparkContext.hadoopFile` and `SparkContext.hadoopRDD` methods to read files during its schema inference step, causing huge file-listing bottlenecks on the driver. This patch refactors this logic to use Spark SQL's `text` data source to read files during this step. The text data source still performs some unnecessary file listing (since in theory we already have resolved the table prior to schema inference and therefore should be able to scan without performing _any_ extra listing), but that listing is much faster and takes place in parallel. In one production workload operating over tens of thousands of files, this change managed to reduce schema inference time from 7 minutes to 2 minutes. A similar problem also affects the JSON file format and this patch originally fixed that as well, but I've decided to split that change into a separate patch so as not to conflict with changes in another JSON PR. ## How was this patch tested? Existing unit tests, plus manual benchmarking on a production workload. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#15813 from JoshRosen/use-text-data-source-in-csv-and-json.
## What changes were proposed in this pull request? This patch significantly improves the IO / file listing performance of schema inference in Spark's built-in CSV data source. Previously, this data source used the legacy `SparkContext.hadoopFile` and `SparkContext.hadoopRDD` methods to read files during its schema inference step, causing huge file-listing bottlenecks on the driver. This patch refactors this logic to use Spark SQL's `text` data source to read files during this step. The text data source still performs some unnecessary file listing (since in theory we already have resolved the table prior to schema inference and therefore should be able to scan without performing _any_ extra listing), but that listing is much faster and takes place in parallel. In one production workload operating over tens of thousands of files, this change managed to reduce schema inference time from 7 minutes to 2 minutes. A similar problem also affects the JSON file format and this patch originally fixed that as well, but I've decided to split that change into a separate patch so as not to conflict with changes in another JSON PR. ## How was this patch tested? Existing unit tests, plus manual benchmarking on a production workload. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#15813 from JoshRosen/use-text-data-source-in-csv-and-json.
…sonDataSource ## What changes were proposed in this pull request? This PR proposes to use text datasource when Json schema inference. This basically proposes the similar approach in apache#15813 If we use Dataset for initial loading when inferring the schema, there are advantages. Please refer SPARK-18362 It seems JSON one was supposed to be fixed together but taken out according to apache#15813 > A similar problem also affects the JSON file format and this patch originally fixed that as well, but I've decided to split that change into a separate patch so as not to conflict with changes in another JSON PR. Also, this seems affecting some functionalities because it does not use `FileScanRDD`. This problem is described in SPARK-19885 (but it was CSV's case). ## How was this patch tested? Existing tests should cover this and manual test by `spark.read.json(path)` and check the UI. Author: hyukjinkwon <gurwls223@gmail.com> Closes apache#17255 from HyukjinKwon/json-filescanrdd.
…a different encoding ### What changes were proposed in this pull request? This PR proposes to use text datasource in CSV's schema inference. This shares the same reasons of SPARK-18362, SPARK-19885 and SPARK-19918 - we're currently using Hadoop RDD when the encoding is different, which is unnecessary. This PR completes SPARK-18362, and address the comment at #15813 (comment). We should better keep the code paths consistent with existing CSV and JSON datasources as well, but this CSV schema inference with the encoding specified is different from UTF-8 alone. There can be another story that this PR might lead to a bug fix: Spark session configurations, say Hadoop configurations, are not respected during CSV schema inference when the encoding is different (but it has to be set to Spark context for schema inference when the encoding is different). ### Why are the changes needed? For consistency, potentially better performance, and fixing a potentially very corner case bug. ### Does this PR introduce _any_ user-facing change? Virtually no. ### How was this patch tested? Existing tests should cover. Closes #29063 from HyukjinKwon/SPARK-32270. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…a different encoding This PR proposes to use text datasource in CSV's schema inference. This shares the same reasons of SPARK-18362, SPARK-19885 and SPARK-19918 - we're currently using Hadoop RDD when the encoding is different, which is unnecessary. This PR completes SPARK-18362, and address the comment at apache#15813 (comment). We should better keep the code paths consistent with existing CSV and JSON datasources as well, but this CSV schema inference with the encoding specified is different from UTF-8 alone. There can be another story that this PR might lead to a bug fix: Spark session configurations, say Hadoop configurations, are not respected during CSV schema inference when the encoding is different (but it has to be set to Spark context for schema inference when the encoding is different). For consistency, potentially better performance, and fixing a potentially very corner case bug. Virtually no. Existing tests should cover. Closes apache#29063 from HyukjinKwon/SPARK-32270. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…aframe read / write API ### What changes were proposed in this pull request? This PR is a retry of #47328 which replaces RDD to Dataset to write SparkR metadata plus this PR removes `repartition(1)`. We actually don't need this when the input is single row as it creates only single partition: https://github.com/apache/spark/blob/e5e751b98f9ef5b8640079c07a9a342ef471d75d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala#L49-L57 ### Why are the changes needed? In order to leverage Catalyst optimizer and SQL engine. For example, now we leverage UTF-8 encoding instead of plain JDK ser/de for strings. We have made similar changes in the past, e.g., #29063, #15813, #17255 and SPARK-19918. Also, we remove `repartition(1)`. To avoid unnecessary shuffle. With `repartition(1)`: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=6] +- LocalTableScan [_1#0] ``` Without `repartition(1)`: ``` == Physical Plan == LocalTableScan [_1#2] ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI in this PR should verify the change ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47341 from HyukjinKwon/SPARK-48883-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…aframe read / write API ### What changes were proposed in this pull request? This PR is a retry of apache#47328 which replaces RDD to Dataset to write SparkR metadata plus this PR removes `repartition(1)`. We actually don't need this when the input is single row as it creates only single partition: https://github.com/apache/spark/blob/e5e751b98f9ef5b8640079c07a9a342ef471d75d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala#L49-L57 ### Why are the changes needed? In order to leverage Catalyst optimizer and SQL engine. For example, now we leverage UTF-8 encoding instead of plain JDK ser/de for strings. We have made similar changes in the past, e.g., apache#29063, apache#15813, apache#17255 and SPARK-19918. Also, we remove `repartition(1)`. To avoid unnecessary shuffle. With `repartition(1)`: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=6] +- LocalTableScan [_1#0] ``` Without `repartition(1)`: ``` == Physical Plan == LocalTableScan [_1#2] ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI in this PR should verify the change ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47341 from HyukjinKwon/SPARK-48883-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…aframe read / write API ### What changes were proposed in this pull request? This PR is a retry of apache#47328 which replaces RDD to Dataset to write SparkR metadata plus this PR removes `repartition(1)`. We actually don't need this when the input is single row as it creates only single partition: https://github.com/apache/spark/blob/e5e751b98f9ef5b8640079c07a9a342ef471d75d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala#L49-L57 ### Why are the changes needed? In order to leverage Catalyst optimizer and SQL engine. For example, now we leverage UTF-8 encoding instead of plain JDK ser/de for strings. We have made similar changes in the past, e.g., apache#29063, apache#15813, apache#17255 and SPARK-19918. Also, we remove `repartition(1)`. To avoid unnecessary shuffle. With `repartition(1)`: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=6] +- LocalTableScan [_1#0] ``` Without `repartition(1)`: ``` == Physical Plan == LocalTableScan [_1#2] ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI in this PR should verify the change ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47341 from HyukjinKwon/SPARK-48883-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This patch significantly improves the IO / file listing performance of schema inference in Spark's built-in CSV data source.
Previously, this data source used the legacy
SparkContext.hadoopFile
andSparkContext.hadoopRDD
methods to read files during its schema inference step, causing huge file-listing bottlenecks on the driver.This patch refactors this logic to use Spark SQL's
text
data source to read files during this step. The text data source still performs some unnecessary file listing (since in theory we already have resolved the table prior to schema inference and therefore should be able to scan without performing any extra listing), but that listing is much faster and takes place in parallel. In one production workload operating over tens of thousands of files, this change managed to reduce schema inference time from 7 minutes to 2 minutes.A similar problem also affects the JSON file format and this patch originally fixed that as well, but I've decided to split that change into a separate patch so as not to conflict with changes in another JSON PR.
How was this patch tested?
Existing unit tests, plus manual benchmarking on a production workload.